import psycopg2
from prefect import flow, task

"""
创建到高斯数据库的连接
"""
def create_connection():
    try:
        conn = psycopg2.connect(
            host="xxx.xxx.xxx.xxx",  # 替换为实际主机地址
            port="xxxx",  # 替换为实际端口号
            database="xxxxxx",  # 替换为实际数据库名
            user="xxx",  # 替换为实际用户名
            password="xxxxxx"  # 替换为实际密码
        )
        return conn
    except psycopg2.Error as e:
        print(f"数据库连接出错: {e}")

"""
插入表数据
"""
@task
def insert_data():
    conn = create_connection()
    cursor = conn.cursor()
    try:
        # 插入两条数据记录
        cursor.execute("INSERT INTO xjw.prefect_test (name, age) VALUES ('xiaoming', 30)")
        cursor.execute("INSERT INTO xjw.prefect_test (name, age) VALUES ('xiaohong', 20)")
        conn.commit()
        print("插入成功")
    except psycopg2.Error as e:
        print(f"插入数据出错: {e}")
    finally:
        cursor.close()
        conn.close()

"""
修改表数据
"""
@task
def update_data():
    conn = create_connection()
    cursor = conn.cursor()
    try:
        # 修改表数据
        cursor.execute("UPDATE xjw.prefect_test SET name = 'xiaoxiong' WHERE name = 'xiaoming'")
        conn.commit()
        print("更新成功")
    except psycopg2.Error as e:
        print(f"更新数据出错: {e}")
    finally:
        cursor.close()
        conn.close()

"""
删除表数据
"""
@task
def delete_data():
    conn = create_connection()
    cursor = conn.cursor()
    try:
        # 删除表数据
        cursor.execute("DELETE FROM xjw.prefect_test WHERE name = 'xiaohong'")
        conn.commit()
        print("删除成功")
    except psycopg2.Error as e:
        print(f"删除数据出错: {e}")
    finally:
        cursor.close()
        conn.close()
        
"""
创建测试表
"""
@task
def create_table():
    conn = create_connection()
    cursor = conn.cursor()
    try:
        # 创建测试表（如果不存在），这里表名为prefect_test，包含id（自增主键）、name字段和age字段
        cursor.execute("CREATE TABLE IF NOT EXISTS xjw.prefect_test (id SERIAL PRIMARY KEY, name VARCHAR(255), age INT)")
    except psycopg2.Error as e:
        print(f"建表出错: {e}")
    finally:
        cursor.close()
        conn.close()

"""
查询表数据
"""
@task
def query_data():
    conn = create_connection()
    cursor = conn.cursor()
    try:
        cursor.execute("SELECT * FROM xjw.prefect_test")
        rows = cursor.fetchall()
        for row in rows:
            print(row)
    except psycopg2.Error as e:
        print(f"查询数据出错: {e}")
    finally:
        cursor.close()
        conn.close()

@flow
def database_operations_flow():
    create_table()
    insert_data()
    query_data()
    update_data()
    query_data()
    delete_data()
    query_data()

if __name__ == '__main__':
    database_operations_flow()

